package com.cat.paxos.proposer;

import com.cat.paxos.acceptor.Acceptor;
import com.cat.paxos.acceptor.ReceiveProposalResponse;
import com.cat.paxos.proposal.ObjectProposal;
import com.cat.paxos.proposal.Proposal;

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.stream.Collectors;

/**
 * @auther Cat.wang
 * @date 2020/1/15 17:15
 */
public class ObjectProposer implements Proposer {

    private List<Acceptor> acceptors = new ArrayList<>();

    private AcceptorStrategy strategy = new HalfUpAcceptorStrategy();

    @Override
    public Proposal<Object> prepare(Object payload) throws PrepareProposalException {
        try {
            String proposalNumber = strategy.decisionProposalNumber(
                    this.acceptors.size(),
                    this.acceptors
                            .stream().parallel().map(a -> {
                        try {
                            return a.getProposalNumber();
                        } catch (Exception e) {
                            return null;
                        }
                    })
                            .collect(Collectors.toList())
            );
            return new ObjectProposal(proposalNumber, payload);
        } catch (Exception e) {
            throw new PrepareProposalException(e.getMessage(), e);
        }
    }

    @Override
    public CommitProposalResponse commit(Proposal proposal) throws CommitProposalException {
        try {
            // 待确认的接收者
            List<Acceptor> waitConfirmAcceptors = new CopyOnWriteArrayList<>();
            CommitWaitConfirmProposalResponse cwcpr = strategy.decisionCommitWaitConfirmProposal(
                    this.acceptors.size(),
                    this.acceptors.stream().parallel()
                            .map(a -> {
                                try {
                                    ReceiveProposalResponse response = a.receive(proposal);
                                    waitConfirmAcceptors.add(a);
                                    return response;
                                } catch (Exception e) {
                                    return null;
                                }
                            })
                            .filter(Objects::nonNull)
                            .collect(Collectors.toList())
            );

            if (cwcpr.ok()) {
                return strategy.decisionCommitConfirmProposal(
                        this.acceptors.size(),
                        waitConfirmAcceptors.stream().parallel()
                                .map(a -> {
                                    try {
                                        return a.confirm(proposal.number());
                                    } catch (Exception e) {
                                        return null;
                                    }
                                })
                                .filter(Objects::nonNull)
                                .collect(Collectors.toList())
                );
            } else {

                return new CommitProposalResponse() {

                    private String proposalNumber = cwcpr.proposalNumber();

                    private boolean ok = cwcpr.ok();

                    @Override
                    public String proposalNumber() {
                        return this.proposalNumber;
                    }

                    @Override
                    public boolean ok() {
                        return this.ok;
                    }
                };
            }
        } catch (Exception e) {
            throw new CommitProposalException(e.getMessage(), e);
        }
    }

    @Override
    public List<Acceptor> acceptors() {
        return this.acceptors;
    }

    @Override
    public AcceptorStrategy strategy() {
        return this.strategy;
    }
}
